package com.demo.sync;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.Charset;
import java.util.List;
import java.util.Set;

public class PullConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("sync_producer_group_test1");
        defaultMQPullConsumer.setNamesrvAddr("localhost:9876");
        defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
        defaultMQPullConsumer.start();
        Set<MessageQueue> messageQueueSet = defaultMQPullConsumer.fetchSubscribeMessageQueues("sync_TopicTest");
        ThreadLocal<Long> offsetThreadLocal = ThreadLocal.withInitial(()->{
            MyThread myThread = (MyThread) Thread.currentThread();
            return myThread.getOffset();
        });

        for(MessageQueue messageQueue : messageQueueSet){
            Long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, true);
            new MyThread(offset,new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", offsetThreadLocal.get(), 1);
                            PullStatus statue = pullResult.getPullStatus();
                            switch (statue){
                                case FOUND:
                                    List<MessageExt> messages = pullResult.getMsgFoundList();
                                    for (int i = 0; i < messages.size(); i++) {
                                        MessageClientExt messageClientExt = (MessageClientExt) messages.get(i);
                                        System.out.printf(Thread.currentThread().getName() + " Receive New Messages: "
                                                + new String(messageClientExt.getBody(), Charset.forName("UTF-8")) + "%n");
                                    }
                                case NO_NEW_MSG:
                                    ;
                                default:
                                    ;
                            }
                            offsetThreadLocal.set(pullResult.getNextBeginOffset());
                            defaultMQPullConsumer.updateConsumeOffset(messageQueue,pullResult.getNextBeginOffset());
                        } catch (MQClientException e) {
                            e.printStackTrace();
                        } catch (RemotingException e) {
                            e.printStackTrace();
                        } catch (MQBrokerException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();

        }

    }
}
/*
1.拉取当前consumer分配的queue，
2.为每个queue分配1个线程，执行while(true)
3.无法实现负载均衡，只能消费所有的队列。因为当新的consumer加入进来，当前consumer也不会重新去获取queue了

case BROADCASTING:
    this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    break;
case CLUSTERING:
    this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    break;
会根据消费模式选择offset的保存位置
所以这里应该手动进行offset的持久化，使用threadlocal也不行。可以使用redis或者memcached等本地缓存。
 */
